#!/usr/bin/env -S uv run --script --frozen --only-group release-tools  # -*-python-*-

import json
import re
import time
import urllib.parse
from collections import defaultdict
from dataclasses import dataclass, field
from functools import cache
from typing import Annotated, Any

import requests
import typer
from github import Auth, Github
from rich.console import Console
from rich.progress import Progress
from typing_extensions import override


def encode_hash_component(s: str) -> str:
    hash_replacements = {
        "%": ".",
        "(": ".28",
        ")": ".29",
        ".": ".2E",
    }
    encoded = urllib.parse.quote(s, safe="*")
    return "".join(hash_replacements.get(c, c) for c in encoded)


@cache
def search_czo_for_number(prefix: str, number: int) -> frozenset[str]:
    params = {
        "anchor": "newest",
        "num_before": "100",
        "num_after": "0",
        "narrow": json.dumps(
            [
                {"negated": False, "operator": "search", "operand": f'"#{prefix}{number}"'},
                {"negated": False, "operator": "channels", "operand": "web-public"},
            ]
        ),
    }

    try:
        response = requests.get("https://chat.zulip.org/json/messages", params=params, timeout=30)
        response.raise_for_status()

        data = response.json()
        messages = data.get("messages", [])

        # Extract unique topic URLs
        urls = set()
        for msg in messages:
            stream_id = msg.get("stream_id")
            display_recipient = msg.get("display_recipient")
            subject = msg.get("subject")

            assert stream_id
            assert display_recipient

            encoded_recipient = encode_hash_component(display_recipient.replace(" ", "-"))
            encoded_subject = encode_hash_component(subject)
            url = f"https://chat.zulip.org/#narrow/channel/{stream_id}-{encoded_recipient}/topic/{encoded_subject}"
            urls.add(url)

        return frozenset(urls)

    except requests.exceptions.RequestException as e:
        assert e.response
        if e.response.status_code != 429:
            raise
        retry_after = int(e.response.headers["Retry-After"]) + 1
        time.sleep(retry_after)
        return search_czo_for_number(prefix, number)


@dataclass
class Issue:
    number: int
    title: str
    czo_urls: set[str] = field(default_factory=set)
    closed_by_prs: list["PullRequest"] = field(default_factory=list)
    duplicate_issue_ids: set[int] = field(default_factory=set)

    @override
    def __hash__(self) -> int:
        return hash(self.number)

    @override
    def __eq__(self, other: object) -> bool:
        return isinstance(other, Issue) and self.number == other.number


@dataclass
class PullRequest:
    number: int
    title: str
    czo_urls: set[str] = field(default_factory=set)

    @override
    def __hash__(self) -> int:
        return hash(self.number)

    @override
    def __eq__(self, other: object) -> bool:
        return isinstance(other, PullRequest) and self.number == other.number


class CommitRangeAnalyzer:
    COMMIT_PRS_QUERY = """
    query($oid: GitObjectID!, $repo: String!) {
      repository(owner: "zulip", name: $repo) {
        object(oid: $oid) {
          ... on Commit {
            messageBody
            associatedPullRequests(first: 10) {
              nodes {
                number
                title
                url
                body
                comments(first: 100) {
                  nodes {
                    body
                  }
                }
                closingIssuesReferences(first: 50) {
                  nodes {
                    number
                    title
                    url
                    body
                    comments(first: 100) {
                      nodes {
                        body
                      }
                    }
                    timelineItems(first:100, itemTypes:MARKED_AS_DUPLICATE_EVENT) {
                      ... on IssueTimelineItemsConnection {
                        nodes {
                          ... on MarkedAsDuplicateEvent {
                            duplicate {
                              ... on Issue {
                                number
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
    """

    ISSUE_QUERY = """
    query($number: Int!, $repo: String!) {
      repository(owner: "zulip", name: $repo) {
        issue(number: $number) {
          number
          title
          body
          comments(first: 100) {
            nodes {
              body
            }
          }
          timelineItems(first:100, itemTypes:MARKED_AS_DUPLICATE_EVENT) {
            ... on IssueTimelineItemsConnection {
              nodes {
                ... on MarkedAsDuplicateEvent {
                  duplicate {
                    ... on Issue {
                      number
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
    """

    def __init__(self, token: str, reponame: str, czo_issue_prefix: str) -> None:
        self.github = Github(auth=Auth.Token(token))
        self.reponame = reponame
        self.czo_issue_prefix = czo_issue_prefix

    @staticmethod
    def _extract_czo_urls(text: str | None) -> set[str]:
        if not text:
            return set()

        matches = re.findall(r"https://chat\.zulip\.org/[^\s\)\]\>]+", text)

        urls = set()
        for url in matches:
            # We strip off and remove /with/... and /near/... to
            # reduce the number of unique links which are generated.
            parsed_url = re.match(r"(.*)/topic/([^/]+)(/(near|with)/.*)?$", url)
            if not parsed_url:
                continue
            urls.add(
                parsed_url[1]
                + "/topic/"
                # Normalize the topic by decoding and re-encoding
                + encode_hash_component(urllib.parse.unquote(parsed_url[2].replace(".", "%")))
            )

        return urls

    @staticmethod
    def _extract_issue_numbers(reponame: str, text: str | None) -> set[int]:
        if not text:
            return set()

        # https://docs.github.com/en/issues/tracking-your-work-with-issues/using-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword
        pattern = rf"\b(?:close[sd]?|fix(?:es|ed)?|resolve[sd]?):?\s+(?:zulip/{reponame})?#(\d+)\b"
        matches = re.findall(pattern, text, re.IGNORECASE)
        return {int(num) for num in matches}

    def _issue_from_node(self, issue_node: dict[str, Any], console: Console) -> Issue:
        issue = Issue(
            number=issue_node["number"],
            title=issue_node["title"].strip(),
        )
        issue_comments = issue_node.get("comments", {}).get("nodes", [])
        for node in [issue_node, *issue_comments]:
            issue.czo_urls.update(self._extract_czo_urls(node.get("body")))
        issue.czo_urls.update(search_czo_for_number(self.czo_issue_prefix, issue.number))

        issue.duplicate_issue_ids = {
            event["duplicate"]["number"] for event in issue_node["timelineItems"]["nodes"]
        }
        for issue_id in issue.duplicate_issue_ids:
            if duplicate_info := self._fetch_issue(issue_id, console):
                issue.czo_urls.update(duplicate_info.czo_urls)
                issue.duplicate_issue_ids.update(duplicate_info.duplicate_issue_ids)
        return issue

    def _fetch_issue(self, number: int, console: Console) -> Issue | None:
        """Fetch issue metadata from GitHub."""
        try:
            _, result = self.github.requester.graphql_query(
                self.ISSUE_QUERY, {"number": number, "repo": self.reponame}
            )

            if "errors" in result:
                console.log(
                    "Failed to fetch zulip/%s#%d: %s", self.reponame, number, result["errors"]
                )
                return None

            data = result.get("data", {})
            issue_node = data.get("repository", {}).get("issue")

            if not issue_node:
                return None

            return self._issue_from_node(issue_node, console)

        except Exception:
            return None

    def get_issues_for_commit(
        self, commit_sha: str, console: Console
    ) -> tuple[dict[Issue, set[PullRequest]], set[int]]:
        _, result = self.github.requester.graphql_query(
            self.COMMIT_PRS_QUERY, {"oid": commit_sha, "repo": self.reponame}
        )

        if "errors" in result:
            error_messages = [e.get("message", str(e)) for e in result["errors"]]
            raise RuntimeError(f"GraphQL errors: {', '.join(error_messages)}")

        data = result.get("data", {})

        if not data.get("repository", {}).get("object"):
            return dict(), set()

        commit_obj = data["repository"]["object"]
        commit_message = commit_obj.get("messageBody", "")
        pr_nodes = commit_obj.get("associatedPullRequests", {}).get("nodes", [])

        results: dict[Issue, set[PullRequest]] = defaultdict(set)
        all_seen_prs: set[int] = set()
        for pr_node in pr_nodes:
            pr = PullRequest(
                number=pr_node["number"],
                title=pr_node["title"].strip(),
            )
            all_seen_prs.add(pr.number)
            pr_comments = pr_node.get("comments", {}).get("nodes", [])
            for node in [pr_node, *pr_comments]:
                pr.czo_urls.update(self._extract_czo_urls(node.get("body")))
            pr.czo_urls.update(search_czo_for_number(self.czo_issue_prefix, pr.number))

            # Get issues from PR metadata
            issues_dict = {}
            issue_nodes = pr_node.get("closingIssuesReferences", {}).get("nodes", [])
            for issue_node in issue_nodes:
                if issue_node is None:
                    continue

                issue = self._issue_from_node(issue_node, console)
                issues_dict[issue.number] = issue

            # Extract additional issue numbers from commit message
            for issue_num in self._extract_issue_numbers(self.reponame, commit_message):
                if issue_num not in issues_dict:
                    maybe_issue = self._fetch_issue(issue_num, console)
                    if maybe_issue is None:
                        continue
                    issues_dict[issue_num] = maybe_issue

            for issue in issues_dict.values():
                results[issue].add(pr)

        if not pr_nodes:
            for issue_num in self._extract_issue_numbers(self.reponame, commit_message):
                maybe_issue = self._fetch_issue(issue_num, console)
                if maybe_issue is None:
                    continue
                results[maybe_issue].update()

        return results, all_seen_prs

    def analyze_range(self, base: str, head: str) -> list[Issue]:
        console = Console(stderr=True, log_path=False)

        repository = self.github.get_repo(f"zulip/{self.reponame}")
        comparison = repository.compare(base, head)
        commit_shas = [commit.sha for commit in comparison.commits]
        console.log(f"Found {len(commit_shas)} commits")

        all_seen_prs = set()
        issue_to_prs: dict[Issue, set[PullRequest]] = defaultdict(set)
        with Progress(console=console) as progress:
            task = progress.add_task("Processing commits...", total=len(commit_shas))
            for sha in commit_shas:
                progress.console.log(f"Processing {sha}")

                this_issue_to_prs, this_seen_prs = self.get_issues_for_commit(sha, console)
                for issue, prs in this_issue_to_prs.items():
                    issue_to_prs[issue].update(prs)
                all_seen_prs.update(this_seen_prs)
                progress.advance(task)

        for issue, prs in issue_to_prs.items():
            for pr in prs:
                issue.czo_urls.update(pr.czo_urls)

            issue.closed_by_prs = sorted(prs, key=lambda p: p.number)

        unique_prs = len({pr for prs in issue_to_prs.values() for pr in prs})
        console.log(
            f"Found {len(all_seen_prs)} unique PRs, of which {unique_prs} PRs closed {len(issue_to_prs)} issues",
        )

        return sorted(issue_to_prs.keys(), key=lambda x: x.number)


def validate_github_token(value: str) -> str:
    # https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/about-authentication-to-github#githubs-token-formats
    if value.startswith("github_"):
        return value
    if re.match(r"gh[pousr]_", value):
        return value
    raise typer.BadParameter("Github access tokens start with `github_`, or `gh`")


from enum import Enum


class ZulipRepo(str, Enum):
    zulip = ("zulip",)
    flutter = "zulip-flutter"


repo_issue_prefixes = {
    ZulipRepo.zulip: "",
    ZulipRepo.flutter: "F",
}


def main(
    base_commit: Annotated[str, typer.Argument(help="Commit-ish, resolved on the server")],
    head_commit: Annotated[str, typer.Argument(help="Commit-ish, resolved on the server")],
    token: Annotated[
        str,
        typer.Option(
            metavar="TOKEN",
            envvar="GITHUB_TOKEN",
            show_envvar=True,
            callback=validate_github_token,
            help=(
                "Github access token; can be a fine-grained personal access token "
                "with read-only access to 'Public repositories'.  "
                "See https://github.com/settings/personal-access-tokens/new"
            ),
        ),
    ],
    repo: ZulipRepo = ZulipRepo.zulip,
) -> int:
    """Find issues which are closed in a commit range."""

    lines = []
    prefix = repo_issue_prefixes[repo]
    analyzer = CommitRangeAnalyzer(token, repo.value, prefix)
    for issue in analyzer.analyze_range(base_commit, head_commit):
        lines.append(f"#### #{prefix}{issue.number}: {issue.title}")
        if issue.duplicate_issue_ids:
            lines.append(
                " - **Duplicates:** "
                + ", ".join(f"#{prefix}{number}" for number in issue.duplicate_issue_ids)
            )
        lines.extend(
            f" - **Closed by:** #{prefix}{pr.number}: {pr.title}" for pr in issue.closed_by_prs
        )
        lines.extend(f" - {czo_url}" for czo_url in sorted(issue.czo_urls))

        lines.append("")

    print("\n".join(lines))

    return 0


if __name__ == "__main__":
    typer.run(main)
